{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<i>Copyright (c) Microsoft Corporation. All rights reserved.</i>\n",
    "\n",
    "<i>Licensed under the MIT License.</i>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Building a Real-time Recommendation API\n",
    "\n",
    "This reference architecture shows the full lifecycle of building a recommendation system. It walks through the creation of appropriate azure resources, training a recommendation model using Azure Databricks and deploying it as an API. It uses Azure Cosmos DB, Azure Machine Learning, and Azure Kubernetes Service. \n",
    "\n",
    "This architecture can be generalized for many recommendation engine scenarios, including recommendations for products, movies, and news. \n",
    "\n",
    "### Architecture\n",
    "![architecture](https://recodatasets.blob.core.windows.net/images/reco-arch.png \"Architecture\")\n",
    "\n",
    "**Scenario**: A media organization wants to provide movie or video recommendations to its users. By providing personalized recommendations, the organization meets several business goals, including increased click-through rates, increased engagement on site, and higher user satisfaction.\n",
    "\n",
    "In this reference, we train and deploy a real-time recommender service API that can provide the top 10 movie recommendations for a given user. \n",
    "\n",
    "### Components\n",
    "This architecture consists of the following key components:\n",
    "* [Azure Databricks](https://docs.microsoft.com/en-us/azure/azure-databricks/what-is-azure-databricks) is used as a development environment to prepare input data and train the recommender model on a Spark cluster. Azure Databricks also provides an interactive workspace to run and collaborate on notebooks for any data processing or machine learning tasks. \n",
    "* [Azure Kubernetes Service](https://docs.microsoft.com/en-us/azure/aks/intro-kubernetes)(AKS) is used to deploy and operationalize a machine learning model service API on a Kubernetes cluster. AKS hosts the containerized model, providing scalability that meets throughput requirements, identity and access management, and logging and health monitoring. \n",
    "* [Azure Cosmos DB](https://docs.microsoft.com/en-us/azure/cosmos-db/introduction) is a globally distributed database service used to store the top 10 recommended movies for each user. Azure Cosmos DB is ideal for this scenario as it provides low latency (10 ms at 99th percentile) to read the top recommended items for a given user. \n",
    "* [Azure Machine Learning Service](https://docs.microsoft.com/en-us/azure/machine-learning/service/) is a service used to track and manage machine learning models, and then package and deploy these models to a scalable Azure Kubernetes Service environment.\n",
    "\n",
    "\n",
    "### Table of Contents.\n",
    "0. [File Imports](#0-File-Imports)\n",
    "1. [Service Creation](#1-Service-Creation)\n",
    "2. [Training](#2-Training)\n",
    "3. [Operationalization](#3.-Operationalize-the-Recommender-Service)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Setup\n",
    "\n",
    "This notebook should be run on Azure Databricks. To import this notebook into your Azure Databricks Workspace, see instructions [here](https://docs.azuredatabricks.net/user-guide/notebooks/notebook-manage.html#import-a-notebook).\n",
    "\n",
    "Setup for Azure Databricks should be completed by following the appropriate sections in the repository's [SETUP file](../../SETUP.md). \n",
    "\n",
    "Please note: This notebook **REQUIRES** that you add the dependencies to support operationalization. See the [SETUP file](../../SETUP.md) for details."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 0 File Imports"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "import numpy as np\n",
    "import os\n",
    "import pandas as pd\n",
    "import pprint\n",
    "import shutil\n",
    "import time, timeit\n",
    "import urllib\n",
    "import yaml\n",
    "import json\n",
    "import uuid\n",
    "import matplotlib\n",
    "import matplotlib.pyplot as plt\n",
    "\n",
    "from azure.common.client_factory import get_client_from_cli_profile\n",
    "from azure.mgmt.compute import ComputeManagementClient\n",
    "import azure.mgmt.cosmosdb\n",
    "import azureml.core\n",
    "from azureml.core import Workspace\n",
    "from azureml.core.run import Run\n",
    "from azureml.core.experiment import Experiment\n",
    "from azureml.core.model import Model\n",
    "from azureml.core.image import ContainerImage\n",
    "from azureml.core.compute import AksCompute, ComputeTarget\n",
    "from azureml.core.webservice import Webservice, AksWebservice\n",
    "\n",
    "\n",
    "import pydocumentdb\n",
    "import pydocumentdb.document_client as document_client\n",
    "\n",
    "import pyspark\n",
    "from pyspark.ml.feature import StringIndexer\n",
    "from pyspark.ml.recommendation import ALS\n",
    "from pyspark.sql import Row\n",
    "from pyspark.sql.types import StructType, StructField\n",
    "from pyspark.sql.types import StringType, FloatType, IntegerType, LongType\n",
    "\n",
    "from reco_utils.dataset import movielens\n",
    "from reco_utils.dataset.cosmos_cli import find_collection, read_collection, read_database, find_database\n",
    "from reco_utils.dataset.spark_splitters import spark_random_split\n",
    "from reco_utils.evaluation.spark_evaluation import SparkRatingEvaluation, SparkRankingEvaluation\n",
    "\n",
    "print(\"PySpark version:\", pyspark.__version__)\n",
    "print(\"Azure SDK version:\", azureml.core.VERSION)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 1 Service Creation\n",
    "Modify the **Subscription ID** to the subscription you would like to deploy to.\n",
    "\n",
    "#### Services created by this notebook:\n",
    "1. [Azure ML Service](https://docs.databricks.com/user-guide/libraries.html)\n",
    "1. [Azure Cosmos DB](https://azure.microsoft.com/en-us/services/cosmos-db/)\n",
    "1. [Azure Container Registery](https://docs.microsoft.com/en-us/azure/container-registry/)\n",
    "1. [Azure Container Instances](https://docs.microsoft.com/en-us/azure/container-instances/)\n",
    "1. [Azure Application Insights](https://azure.microsoft.com/en-us/services/monitor/)\n",
    "1. [Azure Storage](https://docs.microsoft.com/en-us/azure/storage/common/storage-account-overview)\n",
    "1. [Azure Key Vault](https://azure.microsoft.com/en-us/services/key-vault/)\n",
    "1. [Azure Kubernetes Service (AKS)](https://azure.microsoft.com/en-us/services/kubernetes-service/)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Select the services names\n",
    "short_uuid = str(uuid.uuid4())[:4]\n",
    "prefix = \"reco\" + short_uuid\n",
    "data = \"mvl\"\n",
    "algo = \"als\"\n",
    "\n",
    "# location to store the secrets file for cosmosdb\n",
    "secrets_path = '/dbfs/FileStore/dbsecrets.json'\n",
    "ws_config_path = '/dbfs/FileStore'\n",
    "\n",
    "# Add your subscription ID\n",
    "subscription_id = \"\"\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Resource group and workspace\n",
    "resource_group = prefix + \"_\" + data\n",
    "workspace_name = prefix + \"_\"+data+\"_aml\"\n",
    "workspace_region = \"westus2\"\n",
    "print(\"Resource group:\", resource_group)\n",
    "\n",
    "# Columns\n",
    "userCol = \"UserId\"\n",
    "itemCol = \"MovieId\"\n",
    "ratingCol = \"Rating\"\n",
    "\n",
    "# CosmosDB\n",
    "location = workspace_region\n",
    "account_name = resource_group + \"-ds-sql\"\n",
    "# account_name for CosmosDB cannot have \"_\" and needs to be less than 31 chars\n",
    "account_name = account_name.replace(\"_\",\"-\")[0:min(31,len(prefix))]\n",
    "DOCUMENTDB_DATABASE = \"recommendations\"\n",
    "DOCUMENTDB_COLLECTION = \"user_recommendations_\" + algo\n",
    "\n",
    "# AzureML\n",
    "history_name = 'spark-ml-notebook'\n",
    "model_name = data+\"-\"+algo+\"-reco.mml\" #NOTE: The name of a asset must be only letters or numerals, not contain spaces, and under 30 characters\n",
    "service_name = data + \"-\" + algo\n",
    "experiment_name = data + \"_\"+ algo +\"_Experiment\"\n",
    "# Name here must be <= 16 chars and only include letters, numbers and \"-\"\n",
    "aks_name = prefix.replace(\"_\",\"-\")[0:min(12,len(prefix))] + '-aks'\n",
    "# add a name for the container\n",
    "container_image_name = '-'.join([data, algo])\n",
    "\n",
    "train_data_path = data + \"Train\"\n",
    "test_data_path = data + \"Test\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 1.1 Import or create the AzureML Workspace. \n",
    "This command will check if the AzureML Workspace exists or not, and will create the workspace if it doesn't exist."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "ws = Workspace.create(name = workspace_name,\n",
    "                      subscription_id = subscription_id,\n",
    "                      resource_group = resource_group, \n",
    "                      location = workspace_region,\n",
    "                      exist_ok=True)\n",
    "\n",
    "# persist the subscription id, resource group name, and workspace name in aml_config/config.json.\n",
    "ws.write_config(ws_config_path)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 1.2 Create a Cosmos DB resource to store recommendation results:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "## explicitly pass subscription_id in case user has multiple subscriptions\n",
    "client = get_client_from_cli_profile(azure.mgmt.cosmosdb.CosmosDB,\n",
    "                                    subscription_id=subscription_id)\n",
    "\n",
    "async_cosmosdb_create = client.database_accounts.create_or_update(\n",
    "    resource_group,\n",
    "    account_name,\n",
    "    {\n",
    "        'location': location,\n",
    "        'locations': [{\n",
    "            'location_name': location\n",
    "        }]\n",
    "    }\n",
    ")\n",
    "account = async_cosmosdb_create.result()\n",
    "\n",
    "my_keys = client.database_accounts.list_keys(\n",
    "    resource_group,\n",
    "    account_name\n",
    ")\n",
    "\n",
    "master_key = my_keys.primary_master_key\n",
    "endpoint = \"https://\" + account_name + \".documents.azure.com:443/\"\n",
    "\n",
    "#db client\n",
    "client = document_client.DocumentClient(endpoint, {'masterKey': master_key})\n",
    "\n",
    "if find_database(client, DOCUMENTDB_DATABASE) == False:\n",
    "    db = client.CreateDatabase({ 'id': DOCUMENTDB_DATABASE })\n",
    "else:\n",
    "    db = read_database(client, DOCUMENTDB_DATABASE)\n",
    "# Create collection options\n",
    "options = {\n",
    "    'offerThroughput': 11000\n",
    "}\n",
    "\n",
    "# Create a collection\n",
    "collection_definition = { 'id': DOCUMENTDB_COLLECTION, 'partitionKey': {'paths': ['/id'],'kind': 'Hash'} }\n",
    "if find_collection(client,DOCUMENTDB_DATABASE,  DOCUMENTDB_COLLECTION) ==False:\n",
    "    collection = client.CreateCollection(db['_self'], collection_definition, options)\n",
    "else:\n",
    "    collection = read_collection(client, DOCUMENTDB_DATABASE, DOCUMENTDB_COLLECTION)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "secrets = {\n",
    "  \"Endpoint\": endpoint,\n",
    "  \"Masterkey\": master_key,\n",
    "  \"Database\": DOCUMENTDB_DATABASE,\n",
    "  \"Collection\": DOCUMENTDB_COLLECTION,\n",
    "  \"Upsert\": \"true\"\n",
    "}\n",
    "with open(secrets_path, \"w\") as file:\n",
    "    json.dump(secrets, file)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 2 Training\n",
    "\n",
    "Next, we will train an [Alternating Least Squares model](https://spark.apache.org/docs/2.2.0/ml-collaborative-filtering.html) is trained using the [MovieLens](https://grouplens.org/datasets/movielens/) dataset."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# top k items to recommend\n",
    "TOP_K = 10\n",
    "\n",
    "# Select MovieLens data size: 100k, 1m, 10m, or 20m\n",
    "MOVIELENS_DATA_SIZE = '100k'"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 2.1 Download the MovieLens dataset"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Note: The DataFrame-based API for ALS currently only supports integers for user and item ids.\n",
    "schema = StructType(\n",
    "    (\n",
    "        StructField(\"UserId\", IntegerType()),\n",
    "        StructField(\"MovieId\", IntegerType()),\n",
    "        StructField(\"Rating\", FloatType()),\n",
    "        StructField(\"Timestamp\", LongType()),\n",
    "    )\n",
    ")\n",
    "\n",
    "data = movielens.load_spark_df(spark, size=MOVIELENS_DATA_SIZE, schema=schema, dbutils=dbutils)\n",
    "data.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 2.2 Split the data into train, test\n",
    "There are several ways of splitting the data: random, chronological, stratified, etc., each of which favors a different real-world evaluation use case. We will split randomly in this example – for more details on which splitter to choose, consult [this guide](https://github.com/Microsoft/Recommenders/blob/master/notebooks/01_data/data_split.ipynb)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "train, test = spark_random_split(data, ratio=0.75, seed=42)\n",
    "print (\"N train\", train.cache().count())\n",
    "print (\"N test\", test.cache().count())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 2.3 Train the ALS model on the training data, and get the top-k recommendations for our testing data\n",
    "\n",
    "To predict movie ratings, we use the rating data in the training set as users' explicit feedback. The hyperparameters used to estimate the model are set based on [this page](http://mymedialite.net/examples/datasets.html).\n",
    "\n",
    "Under most circumstances, you would explore the hyperparameters and choose an optimal set based on some criteria. For additional details on this process, please see additional information in the deep dives [here](../04_model_select_and_optimize/hypertune_spark_deep_dive.ipynb)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "header = {\n",
    "    \"userCol\": \"UserId\",\n",
    "    \"itemCol\": \"MovieId\",\n",
    "    \"ratingCol\": \"Rating\",\n",
    "}\n",
    "\n",
    "\n",
    "als = ALS(\n",
    "    rank=10,\n",
    "    maxIter=15,\n",
    "    implicitPrefs=False,\n",
    "    alpha=0.1,\n",
    "    regParam=0.05,\n",
    "    coldStartStrategy='drop',\n",
    "    nonnegative=True,\n",
    "    **header\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "model = als.fit(train)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In the movie recommendation use case, recommending movies that have been rated by the users do not make sense. Therefore, the rated movies are removed from the recommended items.\n",
    "\n",
    "In order to achieve this, we recommend all movies to all users, and then remove the user-movie pairs that exist in the training dataset."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Get the cross join of all user-item pairs and score them.\n",
    "users = train.select('UserId').distinct()\n",
    "items = train.select('MovieId').distinct()\n",
    "user_item = users.crossJoin(items)\n",
    "dfs_pred = model.transform(user_item)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "dfs_pred.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Remove seen items.\n",
    "dfs_pred_exclude_train = dfs_pred.alias(\"pred\").join(\n",
    "    train.alias(\"train\"),\n",
    "    (dfs_pred['UserId'] == train['UserId']) & (dfs_pred['MovieId'] == train['MovieId']),\n",
    "    how='outer'\n",
    ")\n",
    "\n",
    "top_all = dfs_pred_exclude_train.filter(dfs_pred_exclude_train[\"train.Rating\"].isNull()) \\\n",
    "    .select('pred.' + 'UserId', 'pred.' + 'MovieId', 'pred.' + \"prediction\")\n",
    "\n",
    "top_all.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 2.4 Evaluate how well ALS performs\n",
    "\n",
    "Evaluate model performance using metrics such as Precision@K, Recall@K, [MAP](https://en.wikipedia.org/wiki/Evaluation_measures_\\(information_retrieval\\)) or [nDCG](https://en.wikipedia.org/wiki/Discounted_cumulative_gain). For a full guide on what metrics to evaluate your recommender with, consult [this guide](https://github.com/Microsoft/Recommenders/blob/master/notebooks/03_evaluate/evaluation.ipynb)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "test.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "rank_eval = SparkRankingEvaluation(test, top_all, k = TOP_K, col_user=\"UserId\", col_item=\"MovieId\", \n",
    "                                    col_rating=\"Rating\", col_prediction=\"prediction\", \n",
    "                                    relevancy_method=\"top_k\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Evaluate Ranking Metrics\n",
    "\n",
    "print(\"Model:\\tALS\",\n",
    "      \"Top K:\\t%d\" % rank_eval.k,\n",
    "      \"MAP:\\t%f\" % rank_eval.map_at_k(),\n",
    "      \"NDCG:\\t%f\" % rank_eval.ndcg_at_k(),\n",
    "      \"Precision@K:\\t%f\" % rank_eval.precision_at_k(),\n",
    "      \"Recall@K:\\t%f\" % rank_eval.recall_at_k(), sep='\\n')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Evaluate Rating Metrics\n",
    "\n",
    "prediction = model.transform(test)\n",
    "rating_eval = SparkRatingEvaluation(test, prediction, col_user=\"UserId\", col_item=\"MovieId\", \n",
    "                                    col_rating=\"Rating\", col_prediction=\"prediction\")\n",
    "\n",
    "print(\"Model:\\tALS rating prediction\",\n",
    "      \"RMSE:\\t%.2f\" % rating_eval.rmse(),\n",
    "      \"MAE:\\t%f\" % rating_eval.mae(),\n",
    "      \"Explained variance:\\t%f\" % rating_eval.exp_var(),\n",
    "      \"R squared:\\t%f\" % rating_eval.rsquared(), sep='\\n')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 2.5 Save the model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "model.write().overwrite().save(model_name)\n",
    "model_local = \"file:\" + os.getcwd() + \"/\" + model_name\n",
    "dbutils.fs.cp(model_name, model_local, True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 3. Operationalize the Recommender Service\n",
    "Once the model is built with desirable performance, it will be operationalized to run as a REST endpoint to be utilized by a real time service. We will utilize [Azure Cosmos DB](https://azure.microsoft.com/en-us/services/cosmos-db/), [Azure Machine Learning Service](https://azure.microsoft.com/en-us/services/machine-learning-service/), and [Azure Kubernetes Service](https://docs.microsoft.com/en-us/azure/aks/intro-kubernetes) to operationalize the recommender service."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 3.1 Create a look-up for Recommendations in Cosmos DB\n",
    "\n",
    "First, the Top-10 recommendations for each user as predicted by the model are stored as a lookup table in Cosmos DB. At runtime, the service will return the Top-10 recommendations as precomputed and stored in Cosmos DB:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "with open(secrets_path) as json_data:\n",
    "    writeConfig = json.load(json_data)\n",
    "    recs = model.recommendForAllUsers(10)\n",
    "    recs.withColumn(\"id\",recs[userCol].cast(\"string\")).select(\"id\", \"recommendations.\"+ itemCol)\\\n",
    "    .write.format(\"com.microsoft.azure.cosmosdb.spark\").mode('overwrite').options(**writeConfig).save()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 3.2 Configure Azure Machine Learning\n",
    "\n",
    "Next, Azure Machine Learning Service is used to create a model scoring image and deploy it to Azure Kubernetes Service as a scalable containerized service. To achieve this, a **scoring script** and an **environment config** should be created. The following shows the content of the two files.  \n",
    "\n",
    "In the scoring script, we make a call to Cosmos DB to lookup the top 10 movies to recommend given an input User ID:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "score_sparkml = \"\"\"\n",
    "\n",
    "import json\n",
    "def init(local=False):\n",
    "    global client, collection\n",
    "    try:\n",
    "      # Query them in SQL\n",
    "      import pydocumentdb.document_client as document_client\n",
    "\n",
    "      MASTER_KEY = '{key}'\n",
    "      HOST = '{endpoint}'\n",
    "      DATABASE_ID = \"{database}\"\n",
    "      COLLECTION_ID = \"{collection}\"\n",
    "      database_link = 'dbs/' + DATABASE_ID\n",
    "      collection_link = database_link + '/colls/' + COLLECTION_ID\n",
    "      \n",
    "      client = document_client.DocumentClient(HOST, {'masterKey': MASTER_KEY})\n",
    "      collection = client.ReadCollection(collection_link=collection_link)\n",
    "    except Exception as e:\n",
    "      collection = e\n",
    "def run(input_json):      \n",
    "\n",
    "    try:\n",
    "      import json\n",
    "\n",
    "      id = json.loads(json.loads(input_json)[0])['id']\n",
    "      query = {'query': 'SELECT * FROM c WHERE c.id = \"' + str(id) +'\"' } #+ str(id)\n",
    "\n",
    "      options = {'partitionKey':str(id)}\n",
    "      document_link = 'dbs/{DOCUMENTDB_DATABASE}/colls/{DOCUMENTDB_COLLECTION}/docs/{0}'.format(id)\n",
    "      result = client.ReadDocument(document_link, options);\n",
    "  \n",
    "    except Exception as e:\n",
    "        result = str(e)\n",
    "    return json.dumps(str(result)) #json.dumps({{\"result\":result}})\n",
    "\"\"\"\n",
    "\n",
    "\n",
    "with open(secrets_path) as json_data:\n",
    "    writeConfig = json.load(json_data)\n",
    "    score_sparkml = score_sparkml.replace(\"{key}\",writeConfig['Masterkey']).replace(\"{endpoint}\",writeConfig['Endpoint']).replace(\"{database}\",writeConfig['Database']).replace(\"{collection}\",writeConfig['Collection']).replace(\"{DOCUMENTDB_DATABASE}\",DOCUMENTDB_DATABASE).replace(\"{DOCUMENTDB_COLLECTION}\", DOCUMENTDB_COLLECTION)\n",
    "\n",
    "    exec(score_sparkml)\n",
    "\n",
    "    with open(\"score_sparkml.py\", \"w\") as file:\n",
    "        file.write(score_sparkml)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Next, create a environment config file with the dependencies needed:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "%%writefile myenv_sparkml.yml\n",
    "\n",
    "name: myenv\n",
    "channels:\n",
    "  - defaults\n",
    "dependencies:\n",
    "  - pip:\n",
    "    - numpy==1.14.2\n",
    "    - scikit-learn==0.19.1\n",
    "    - pandas\n",
    "    - azureml-core\n",
    "    - pydocumentdb"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Register your model:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "mymodel = Model.register(model_path = model_name, # this points to a local file\n",
    "                       model_name = model_name, # this is the name the model is registered as, am using same name for both path and name.                 \n",
    "                       description = \"ADB trained model\",\n",
    "                       workspace = ws)\n",
    "\n",
    "print(mymodel.name, mymodel.description, mymodel.version)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 3.3 Deploy the model as a Service on AKS"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### 3.3.1 Create a container for your model service:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Create Image for Web Service\n",
    "models = [mymodel]\n",
    "runtime = \"spark-py\"\n",
    "conda_file = 'myenv_sparkml.yml'\n",
    "driver_file = \"score_sparkml.py\"\n",
    "\n",
    "# image creation\n",
    "from azureml.core.image import ContainerImage\n",
    "myimage_config = ContainerImage.image_configuration(execution_script = driver_file, \n",
    "                                    runtime = runtime, \n",
    "                                    conda_file = conda_file)\n",
    "\n",
    "image = ContainerImage.create(name = container_image_name,\n",
    "                                # this is the model object\n",
    "                                models = [mymodel],\n",
    "                                image_config = myimage_config,\n",
    "                                workspace = ws)\n",
    "\n",
    "# Wait for the create process to complete\n",
    "image.wait_for_creation(show_output = True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### 3.3.2 Create an AKS Cluster to run your container (this may take 20-25 minutes):"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from azureml.core.compute import AksCompute, ComputeTarget\n",
    "\n",
    "# Use the default configuration (can also provide parameters to customize)\n",
    "prov_config = AksCompute.provisioning_configuration()\n",
    "\n",
    "# Create the cluster\n",
    "aks_target = ComputeTarget.create(workspace = ws, \n",
    "                                  name = aks_name, \n",
    "                                  provisioning_configuration = prov_config)\n",
    "\n",
    "aks_target.wait_for_completion(show_output = True)\n",
    "\n",
    "print(aks_target.provisioning_state)\n",
    "print(aks_target.provisioning_errors)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### 3.3.3 Deploy the container image to AKS:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "#Set the web service configuration (using default here with app insights)\n",
    "aks_config = AksWebservice.deploy_configuration(enable_app_insights=True)\n",
    "\n",
    "# Webservice creation using single command, there is a variant to use image directly as well.\n",
    "try:\n",
    "    aks_service = Webservice.deploy_from_image(\n",
    "      workspace=ws, \n",
    "      name=service_name,\n",
    "      deployment_config = aks_config,\n",
    "      image = image,\n",
    "      deployment_target = aks_target\n",
    "      )\n",
    "    aks_service.wait_for_deployment(show_output=True)\n",
    "except Exception:\n",
    "    aks_service = Webservice.list(ws)[0]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 3.4 Call the AKS model service\n",
    "After the deployment, the service can be called with a user ID – the service will then look up the top 10 recommendations for that user in Cosmos DB and send back the results.\n",
    "The following script demonstrates how to call the recommendation service API and view the result for the given user ID:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "scoring_url = aks_service.scoring_uri\n",
    "service_key = aks_service.get_keys()[0]\n",
    "\n",
    "input_data = '[\"{\\\\\"id\\\\\":\\\\\"496\\\\\"}\"]'.encode()\n",
    "\n",
    "req = urllib.request.Request(scoring_url,data=input_data)\n",
    "req.add_header(\"Authorization\",\"Bearer {}\".format(service_key))\n",
    "req.add_header(\"Content-Type\",\"application/json\")\n",
    "\n",
    "tic = time.time()\n",
    "with urllib.request.urlopen(req) as result:\n",
    "    res = result.readlines()\n",
    "    print(res)\n",
    "    \n",
    "toc = time.time()\n",
    "t2 = toc - tic\n",
    "print(\"Full run took %.2f seconds\" % (toc - tic))"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python [conda env:Anaconda3]",
   "language": "python",
   "name": "conda-env-Anaconda3-py"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.8"
  },
  "name": "ALS_Movie_Example",
  "notebookId": 3793436040750096
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
